iT邦幫忙

2024 iThome 鐵人賽

DAY 19
0
Software Development

Django 2024: 從入門到SaaS實戰系列 第 27

Django Channels、Async 和 Celery 的協同之舞: 認識向量資料與Celery

  • 分享至 

  • xImage
  •  

我們再來回顧一次我們的需求:

  1. 需要用戶能夠上傳PDF建立文章
  2. 在儲存文章,或是更新文章時能夠將文章內容轉換成向量資料
  3. 用戶輸入想問的問題,知識庫根據問題找到向量接近的文檔
  4. 將文檔與問題交給大型語言模型(LLM),最終返回答案給用戶

那我們Django部分的主角Celery需要做什麼?

首先,如果儲存與更新文章一定需要時間去解析,並且在知識庫拿到用戶問題,最終返回答案的過程也是需要時間去運行相關排程。因此為了不阻塞主進程,需要有額外的進程來處理這些子任務,而這也就是Celery的功能

今日重點:

  • 介紹向量資料的處理流程
    • 輕量的向量資料庫:Chroma
    • 統一 AI 應用開發框架:LangChain
    • 向量資料處理流程
  • Celery簡介
    • 配置Celery
    • 測試Celery

介紹向量資料的處理流程

為什麼這個專案需要處理向量資料,而向量資料庫又能做到什麼?

如果根據我們之前建立的資料來進行查詢,我們如果輸入LLM就只能查到包含LLM這個字符串的結果,但是我們的目標可能是ChatGPT、Claude…等等。

而向量資料中每一個向量對應一個物件或是項目,如果能夠準確的建立這些向量,那麼我們就能夠過語意來找尋相似的單字而不是根據單字的名稱來尋找

https://ithelp.ithome.com.tw/upload/images/20241011/20161866kTZOBWYRFN.png

圖源:https://www.pinecone.io/learn/series/nlp/dense-vector-embeddings-nlp/

當我們要儲存資料到向量資料庫時,可以設定不同的算法,來決定資料的維度:最後儲存的向量資料

輕量的向量資料庫:Chroma

如果是使用Python或是javaScript開發,並且追求開源且快速開發的話,使用Chroma是很好的選擇

https://github.com/chroma-core/chroma

具備以特點:

  • 開源:Chroma是一個開源項目,可以自由使用和修改
  • 輕量級:它設計得非常輕量,容易安裝和使用,適合快速原型開發和小型項目
  • 嵌入式:Chroma可以嵌入到Python應用中,不需要額外的服務器或基礎設施
  • 支持多種向量:可以存儲和檢索不同類型的向量,如文本、圖像或自定義向量
  • 靈活的索引:支持多種索引方法,可以根據需求選擇最適合的索引策略
  • API簡單:提供簡潔直觀的API,易於集成和使用

因為在這個專案中,我們想要快速建立相關的應用且考慮到與Python的高度適配性,因此選擇Chroma

統一 AI 應用開發框架:LangChain

現在大型語言模型如雨後春筍的出現,而各家模型又有各自的API,並且剛剛提到過的向量資料庫也是百百種。如果有一個框架,能夠統整出相關的API,讓我們能夠隨時替換掉LLM或是向量資料庫而不需要重新開發,在開發上我們就只需要專注在邏輯實現,以及了解不同LLM的特色就好。

LangChain 的核心理念正是為了解決該問題而設計的。它提供了一個抽象層,使得開發者可以輕鬆切換不同的 LLM 和向量數據庫,而無需大幅度修改程式碼

LangChain具備以下特點:

  1. 模型抽象

LangChain 提供了統一的接口來與不同的 LLM 交互。無論是 OpenAI 的 GPT還是開源的 LLaMA,都可以通過相同的 API 調用

  • 可以輕鬆切換或測試不同的 LLM
  • 只需更改模型配置,程式碼其餘部分保持不變
  1. 向量存儲抽象

LangChain 為各種向量資料庫(如 Chroma、Pinecone等)提供了統一的接口

  • 可以根據需求選擇最適合的向量存儲
  • 在開發過程中可以輕鬆切換或比較不同的存儲方案
  1. 鏈式操作

LangChain 引入了鏈(chain)的概念,允許將多個操作組合成一個流程

  • 可以輕鬆地將文檔加載、文本分割、向量化、存儲和查詢等步驟連接起來
  • 這種模塊化的方法使得替換單個組件變得簡單
  1. 提示模板

LangChain 提供了強大的提示(prompt)管理功能

  • 可以創建和管理複雜的提示模板
  • 輕鬆調整和優化提示,而不影響應用的其他部分
  1. 記憶管理

LangChain 提供了多種記憶(memory)組件

  • 可以為 AI 應用添加短期或長期記憶
  • 支持多種記憶類型,如對話歷史、摘要記憶等

通過提供這些抽象和工具,LangChain 讓開發者能夠:

  1. 專注於應用邏輯而不是底層實現
  2. 靈活切換和實驗不同的 LLM 和向量存儲
  3. 快速構建複雜的 AI 應用
  4. 優化和擴展應用而無需大幅重構

這種方法大大降低了 AI 應用開發的門檻,同時提高了開發效率和應用的可維護性

但是因為LLM應用的開發迭代速度很快,也因此LangChain的相關套件也是很常浮動

雖然會說LangChain本身像是瑞士刀一樣的方便,但是現實是這個工具隨時都在改變

雖然核心概念沒變,但是方法的名稱、參數調用的名稱等等都有很多變化

加上許多方式沒有一個非常固定統一的邏輯,很容易在學習沒多久後之前用的方法都被棄用了,套件的迭代速度也很快。因此也有人比較著重在如何使用prompt來達成目的,API還是自己架設

在這個專案中不會著重在LangChain非常強大的Chain與prompt template上,只會做非常基本的操作

許多專案中使用的方法,即使是第一次看到也能

向量資料處理流程

在進到開發功能之前,說明一下資料處理的流程:

文章儲存或是更新時:

  1. 向量資料庫先檢查有沒有該文章對象的相關文檔,如果有的話進行刪除
  2. 將文章內容切割成一小塊一小塊的chunk,並且為了保持上下文完整性,需要前後多保留一段文本。避免返回文本時缺乏關鍵訊息
  3. 將chunks存入向量資料庫

用戶輸入資料時:

  1. 先將問題也向量化,然後找到幾筆語意最接近的chunks
  2. 拿著這些chunks與問題輸入LLM中,讓LLM返回對應的答案

Celery簡介

Celery是一個分散式任務隊列系統,用於處理異步任務和定時任務

整個系統的組成部件如下:

  • Task(任務):要被執行的工作單元,任務可以分成定時任務與非同步任務兩種
  • Worker(工作者):執行任務的進程,從Broker獲取任務並執行
  • Broker(消息代理):儲存與傳遞資訊的中間人,接收任務並將其儲存在隊列中,常見的Broker選項包含Redis與RabbitMQ等
  • Queue(隊列):存儲待執行任務的數據結構,存在於Broker中
  • Backend(結果後端):儲存任務結果

運作流程如下:

[Client] --發送任務--> [Broker/Queue] --分發任務--> [Worker]
[Worker] --執行任務並存儲結果--> [Backend] --獲取結果-->[Client]

那為什麼一開始會說Celery是一個分散式任務系統,因為我們可以藉由建立多個Worker來達到多進程的架構,當發布任務時,主進程(Django繼續做他的事情)而任務則是交給Celery來處理

同時也能給Celery配置的多線程的設計,來讓Celery能夠負擔繁重的任務

配置Celery

  • 安裝所需套件
poetry add celery
poetry add django-celery-beat # 在這次專案用不到,是處理定期任務時才需要
poetry add django-celery-results # 需要儲存任物結果時安裝
poetry add flower # 需要監控celery任務的狀態時安裝,本次專案不會著墨
  • 基本配置
# settings.py

INSTALLED_APPS = [
    ...
    "django_celery_results",
    ...
]

# celery配置
CELERY_BROKER_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0"
CELERY_TIMEZONE = "Asia/Taipei"

# celery結果存儲
CELERY_RESULT_BACKEND = "django-db"
# celery消息格式配置
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"

# celery 超時時間
CELERYD_TASK_TIME_LIMIT = 60 * 10

# celery 儲存結果的過期時間 默認1天過期 如設為0則永不過期
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24

# celery 任務限流
CELERY_TASK_ANNOTATIONS = {"tasks.add": {"rate_limit": "10/s"}}

# 每個worker執行多少任務後會被殺死,預防記憶體泄漏
CELERYD_MAX_TASKS_PER_CHILD = 100

# 消除警告
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
  • 在跟settings.py同級目錄下建立celery.py與修改__init__.py
# celery.py

from celery import Celery
from django.conf import settings

import os

# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "documind.settings")

# create a new Celery instance
app = Celery("documind", broker=settings.CELERY_BROKER_URL)

# Using a string here means the worker doesn't have to serialize
app.config_from_object("django.conf:settings", namespace="CELERY")

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# __init__.py
from .celery import app as celery_app

__all__ = ("celery_app",)
  • 在celery.py中,設置一個新的celery實例對象,並且將settings.py中的配置配置在該實例上,此外使用autodiscover_tasks方法,可以自動偵測所有app下有配置@shared_task裝飾器的任務

  • 在__init__中,確認專案啟動時能夠正確配置到該celery實例

  • 遷移celery_result相關的表格到資料庫

python3 manage.py makemigrations
python3 manage.py migrate
  • 啟動worker,開啟另一個終端機,在manage.py同級目錄下輸入指令(記得要進入poetry shell)
 Celery -A documind worker -l info

https://ithelp.ithome.com.tw/upload/images/20241011/201618668KiG6ptruW.png

測試Celery

那我們這邊先確認配置的celery是否能夠成功運作

  • 在documind目錄下新增tasks.py
    需要加上裝飾器,讓Celery能夠找到任務
from celery import shared_task
import time

@shared_task(max_retries=3)
def add_number(x, y):
    time.sleep(10)
    return x + y
  • 建立測試的視圖,在documind目錄下建立views.py
from django.http import HttpResponse
from .tasks import add_number

def task_view(request):
    task_result = add_number.apply_async((1, 2))
    return HttpResponse(
        f"{task_result.id} is the task id<br />result status: {task_result.status}"
    )

這邊用了apply_async方法來啟動任務,在Celery中有幾中方式來讓worker執行任務:

delay與apply_async,兩種方式的區別在於後者可以更多的設置執行任務的細節

https://ithelp.ithome.com.tw/upload/images/20241011/20161866HbahEZylM2.png

圖源:https://docs.celeryq.dev/en/latest/userguide/calling.html#basics

  • 配置路由
# documind.urls.py

urlpatterns = [
    path("task/", views.task_view, name="task"),
    ...
]

  • 重啟Worker,因為他沒辦法偵測程式碼改動

確認有真的接收到該任務

[tasks]
  . documind.tasks.add_number

當我們嘗試調用該路由時,可以在畫面看到

21004792-9d03-4505-a4c9-4d1fe22a78fe is the task id
result status: PENDING

而啟動Celery的終端機

[2024-10-11 19:48:38,082: INFO/MainProcess] Task documind.tasks.add_number[21004792-9d03-4505-a4c9-4d1fe22a78fe] received
[2024-10-11 19:48:48,137: INFO/MainProcess] Task documind.tasks.add_number[21004792-9d03-4505-a4c9-4d1fe22a78fe] succeeded in 10.055313165998086s: 3

我們可以知道幾件事情:

  1. 因為是透過另外一個進程(worker)透過非同步方式完成任務,因此我們不可能在執行任務的當下,就拿到任務結果
  2. 如果此時去檢查task_result這個返回值:會是AsyncResult 對象
  3. 因此常見的作法是返回id,然後再透過該id去拿到最終的任務結果

確認我們的Celery本身配置沒問題後,就進到專案開發吧

今日總結

  • 我們初步了解了向量資料在我們專案中所扮演的關鍵角色
  • 了解在專案上,需要LangChain這樣的整合框架來幫助我們建立AI應用
  • 使用輕量的Chroma來當作專案的向量資料庫
  • 認識Celery這個分散式任務系統,並進行基本配置,最後確認配置能順利執行非同步任務

下一篇就把以下功能一一完成:

  • 建立PDF上傳與新增文章功能
  • 安裝LangChain相關套件
  • 配置向量資料庫
  • 建立文章內容轉換成向量資料排程
  • 建立用戶輸入問題並返回答案流程

參考資料

  • langChain:https://aict.nkust.edu.tw/digitrans/?p=5466
  • 向量資料庫:https://cloud.tencent.com/developer/article/2312534
  • Chroma:https://github.com/chroma-core/chroma
  • Celery:

上一篇
Django Channels、Async 和 Celery 的協同之舞: DocuMind專案介紹
下一篇
Django Channels、Async 和 Celery 的協同之舞: 打造智能文檔問答系統
系列文
Django 2024: 從入門到SaaS實戰31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言